;; This file is part of GNUnet.
;; Copyright (C) 2012, 2018 GNUnet e.V.
;; Copyright (C) 2021 Maxime Devos
;;
;; GNUnet is free software: you can redistribute it and/or modify it
;; under the terms of the GNU Affero General Public License as published
;; by the Free Software Foundation, either version 3 of the License,
;; or (at your option) any later version.
;;
;; GNUnet is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Affero General Public License for more details.
;;
;; You should have received a copy of the GNU Affero General Public License
;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;
;; SPDX-License-Identifier: AGPL3.0-or-later

;; Author: Florian Dold
;; Author: Christian Grothoff
;; Author: Maxime Devos

(define-module (tests mq))

(use-modules (ice-9 control)
	     (tests utils) ; for conservative-gc?
	     (fibers conditions)
	     (fibers)
	     (srfi srfi-1)
	     (srfi srfi-26)
	     (srfi srfi-39)
	     (srfi srfi-43)
	     (srfi srfi-64)
	     (srfi srfi-111)
	     ((rnrs base) #:select (assert mod))
	     ((rnrs exceptions) #:select (guard))
	     ((rnrs conditions) #:select (condition-who))
	     ((rnrs arithmetic bitwise)
	      #:select (bitwise-ior))
	     (gnu gnunet netstruct syntactic)
	     ((gnu gnunet netstruct procedural)
	      #:select (u32/big))
	     (gnu gnunet mq prio-prefs)
	     (gnu gnunet mq prio-prefs2)
	     (gnu gnunet util struct)
	     (gnu gnunet utils bv-slice)
	     ((gnu extractor enum)
	      #:select (symbol-value value->index))
	     (gnu gnunet message protocols)
	     (gnu gnunet mq)
	     (gnu gnunet mq envelope)
	     (gnu gnunet mq handler)
	     (quickcheck property)
	     (quickcheck)
	     (quickcheck arbitrary))

;; The client code sends the numbers 0 to
;; NUM_TRANSMISSIONS-1 over the message queue.
;; The notify-sent callback verifies whether
;; messages were sent in-order. The fake
;; ‘sender’ procedure verifies whether it received
;; the messages in order.
;;
;; Note that in more realistic situations, some
;; queueing can happen! A very special case
;; is being tested here.

(define NUM_TRANSMISSIONS 100)

(eval-when (expand load eval)
  (define-type /:msg:our-test:dummy
    (structure/packed
     (synopsis "A test message, containing an index")
     (documentation
      "The first time, a message with index 0 is sent.
Then each time the index is increased.")
     (field (header /:message-header))
     (field (index u32/big)))))

(define (index->dummy i)
  (let ((s (make-slice/read-write
	    (sizeof /:msg:our-test:dummy '()))))
    (set%! /:msg:our-test:dummy '(header type) s
	   (value->index (symbol-value message-type msg:util:dummy)))
    (set%! /:msg:our-test:dummy '(header size) s
	   (sizeof /:msg:our-test:dummy '()))
    (set%! /:msg:our-test:dummy '(index) s i)
    s))

(define (dummy->index s)
  (read% /:msg:our-test:dummy '(index) s))

(define (client mq notify-sent-box sent-box)
  (define (see i)
    (if (= i (unbox notify-sent-box))
	(set-box! notify-sent-box (+ 1 i))
	(error "messages were sent out-of-order (index: ~a) (notify-sent: ~a) (sent: ~a)"
	       i
	       (unbox notify-sent-box)
	       (unbox sent-box))))
  (do ((i 0 (+ 1 i)))
      ((>= i NUM_TRANSMISSIONS))
    (send-message! mq (index->dummy i)
		   #:notify-sent! (cut see i))))

(define (send-proc notify-sent-box sent-box envelope)
  (attempt-irrevocable-sent!
   envelope
   ((go message priority)
    (let ((index (dummy->index message)))
      (unless (= (+ index 1) (unbox notify-sent-box))
	(error "messages are being sent out-of-order or with queueing (index: ~a) (notify-sent: ~a) (sent: ~a)"
	       index
	       (unbox notify-sent-box)
	       (unbox sent-box)))
      (unless (= index (unbox sent-box))
	(error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)"
	       index
	       (unbox notify-sent-box)
	       (unbox sent-box)))
      (set-box! sent-box (+ 1 index))
      (values)))
   ((cancelled)
    (error "how did this cancelling happen?"))
   ((already-sent)
    (error "forgot to remove envelope from queue"))))

(define no-handlers (message-handlers))
(define (no-error-handler . what)
  (error "were did this error come from?"))

(test-equal "in-order, no queuing"
  (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)
  (let* ((notify-sent-box (box 0))
	 (sent-box (box 0))
	 (mq (make-message-queue no-handlers
				 no-error-handler
				 (make-one-by-one-sender
				   (cut send-proc notify-sent-box sent-box <>)))))
    (client mq notify-sent-box sent-box)
    (list (unbox notify-sent-box) (unbox sent-box))))



;; Simulate buffering, by only ‘truly’ sending after each three messages.
;; This does _not_ test the queueing code! See the next test for that.
;; Make sure messages aren't lost, and they are still be sent in-order!
;;
;; (Assuming the sender is well-implemented. A buggy sender could send
;; things out-of-order.)

(define (send-proc2 notify-sent-box sent-box mod-box stashed envelope)
  (let ((first-free (vector-index not stashed))
	(expected-filled (unbox mod-box)))
    (unless (= (or first-free 0) expected-filled)
      (error "did we lose a message?"))
    (set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed)))
    (if (not first-free)
	(begin
	  (vector-map!
	   (lambda (i envelope)
	     (send-proc notify-sent-box sent-box envelope)
	     #f)
	   stashed)
	  (vector-set! stashed 0 envelope))
	;; @var{stashed} is not yet full; send the
	;; envelope later!
	(vector-set! stashed first-free envelope))
    (values)))

(define (expected-sent n k)
  (- n (let ((mod (mod n k)))
	 (if (= mod 0)
	     k
	     mod))))

(define k 3)

(test-equal "in-order, some buffering"
  (map (cut expected-sent <> 3)
       (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
  (let* ((notify-sent-box (box 0))
	 (sent-box (box 0))
	 (mod-box (box 0))
	 (stashed (make-vector k #f))
	 (mq (make-message-queue no-handlers
				 no-error-handler
				 (make-one-by-one-sender
				   (cut send-proc2 notify-sent-box sent-box mod-box stashed <>)))))
    (client mq notify-sent-box sent-box)
    (list (unbox notify-sent-box) (unbox sent-box))))



;; Test the queueing code by only flushing
;; the queue every N messages. Also check,
;; using flushing-allowed?, that sending
;; only happens when we expect it to happen.

(define flushing-allowed?
  (make-parameter #f))

(define (send-proc/check notify-sent-box sent-box envelope)
  (assert (flushing-allowed?))
  (send-proc notify-sent-box sent-box envelope))

(define (make-every-n proc k)
  "Make a sender using @var{proc} every @var{k}
invocations, and at other times doing nothing."
  ;; Should theoretically be an atomic, but the test is singly-threaded,
  ;; so don't bother.
  (define n-mod-k 0)
  (lambda (mq)
    (assert (not (flushing-allowed?)))
    (set! n-mod-k (+ 1 n-mod-k))
    (when (>= n-mod-k k)
      (set! n-mod-k 0)
      (parameterize ((flushing-allowed? #t))
	(proc mq)))))

(test-equal "in-order, some queueing"
  (map (cut expected-sent <> 3)
       (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
  (let* ((notify-sent-box (box 0))
	 (sent-box (box 0))
	 (mq (make-message-queue no-handlers
				 no-error-handler
				 (make-every-n
				  (make-one-by-one-sender
				   (cut send-proc/check notify-sent-box sent-box <>))
				  3))))
    (client mq notify-sent-box sent-box)
    (list (unbox notify-sent-box) (unbox sent-box))))



;; Test that concurrency interacts well with queueing.
;;
;; The situation we consider, is a number
;; of different threads concurrently sending messages.
;; The test verifies whether all messages were, in fact, sent.
;;
;; To make things complicated, some queueing is introduced.
;; The sender will only proceed each time the current thread
;; has tried to send @var{k/thread} messages, and the sender
;; will only try to send at most @code{(+ k/thread e)}, where
;; @var{e} is a random number from @var{e/min} to @var{e/max}.

;; The tests detect the following potential problems in the code
;; by crashing (but not always, so you may need to re-run a few
;; times, three times tends to be enough in practice for me):
;;
;;  * Replacing 'old' with 'queue' in
;;    unless (pfds:queue-empty? old)
;;  * Replacing 'old' with 'queue' in
;;    receive (envelope new) (pfds:dequeue old)
;;  * Replacing the first 'old' with 'queue' in
;;    (eq? old (swap! old new)), in 'make-one-by-one-sender'
;;  * Replacing the second 'old' with 'queue' in
;;    (eq? old (swap! old new)), in 'make-one-by-one-sender'
;;  * Replacing 'old' by 'queue' in
;;    (pfds:enqueue old envelope)
;;    (only detected infrequently, odds 1 to 7 or so)
;;  * Replacing the first 'old' by 'queue' in
;;    (eq? old (swap-queue! old new))
;;    in 'send-message!'
;;  * Replacing the second 'old' by 'queue' in
;;    (eq? old (swap-queue! old new))
;;    in 'send-message!'
;;
;; The following problems cause a hang when testing:
;;  * Replacing 'queue' by 'old' in (spin queue)
;;    in 'make-one-by-one-sender'
;;  * Replacing 'queue' by 'old' in (spin queue)
;;    in 'send-message!'.
;;
;; The following problems cause a hang in a preceding
;; test:
;;
;;  * Replacing the first 'old' by 'new' in
;;    (eq? old (swap-queue! old new))
;;    in 'send-message!'
;;  * Replacing 'queue' by 'old' in
;;    (spin queue)
;;    in 'send-message!'
;;  * Replacing 'queue' by 'new' in
;;    (spin queue)
;;    in 'send-message!'
;;
;; Some potential problems currently remain undetected:
;;  * Replacing the 'new' by 'queue' in
;;    (pfds:queue-length new)
;;
;;    However, it is only for printing a warning
;;    when the queue is rather full. Being slightly
;;    off in queue length shouldn't be a problem
;;    there, as the 'maximum reasonable bound'
;;    is just a wild guess and not some exact
;;    cut-off.
;;
;; Cancellation will be tested separately.

(define random/thread
  (fluid->parameter (make-unbound-fluid)))
(define k/thread 12)
(define e/min 2)
(define e/max 7)
(define N_MESSAGES 1000)
(define N_THREAD 40)

;; List of (thread-index . message-index)
;; received by current thread.
(define received/thread
  (fluid->parameter (make-unbound-fluid)))
(define i/thread
  (fluid->parameter (make-unbound-fluid)))

;; The sending is happening concurrently,
;; so in-order delivery cannot be guaranteed.
;; Thus, requesting in-order delivery seems
;; silly.
(define prio
  (bitwise-ior
   (prio->integer 'prio:background)
   (value->index (symbol-value priority-preference
			       pref:out-of-order))))

(eval-when (expand load eval)
  (define-type /:msg:our-test:concurrency
    (structure/packed
     (synopsis "A test message, containing an thread and message index")
     (documentation
      "The first time, a message with index 0 is sent.
Then each time the index is increased.")
     (field (header /:message-header))
     (field (index u32/big))
     (field (thread u32/big)))))

(define (make-thread-message thread-index i)
  (let ((s (make-slice/read-write
	    (sizeof /:msg:our-test:concurrency '()))))
    (set%! /:msg:our-test:concurrency '(header type) s
	   (value->index (symbol-value message-type msg:util:dummy)))
    (set%! /:msg:our-test:concurrency '(header size) s
	   (sizeof /:msg:our-test:concurrency '()))
    (set%! /:msg:our-test:concurrency '(index) s i)
    (set%! /:msg:our-test:concurrency '(thread) s thread-index)
    s))

(define (decode-thread-message s)
  (cons (read% /:msg:our-test:concurrency '(thread) s)
	(read% /:msg:our-test:concurrency '(index) s)))


(define (make-every-n/thread proc k)
  "Make a sender using @var{proc} every @var{k}
invocations, and at other times doing nothing.
@code{i/thread} is used for state."
  (lambda (mq)
    (assert (not (flushing-allowed?)))
    (i/thread (+ 1 (i/thread)))
    (when (>= (i/thread) k)
      (i/thread 0)
      (parameterize ((flushing-allowed? #t))
	(proc mq)))))

(define (thread mq thread-index)
  (parameterize ((received/thread '())
		 (i/thread 0)
		 (random/thread
		  (seed->random-state thread-index)))
    (do ((i 0 (+ 1 i)))
	((>= i N_MESSAGES))
      (send-message! mq (make-thread-message thread-index i)
		     #:priority prio))
    (received/thread)))

(define (make-restricted-sender how-many make-sender inner-proc)
  "Make a sender that, when called, tries to send @code{(how-many)}
messages, using @var{make-sender} and @var{inner-proc}."
  (define escape-thunk
    (fluid->parameter (make-unbound-fluid)))
  (define count
    (fluid->parameter (make-unbound-fluid)))
  (define max-count
    (fluid->parameter (make-unbound-fluid)))
  (define (count!)
    (count (+ 1 (count)))
    (when (= (count) (max-count))
      (count 0)
      ((escape-thunk))))
  (lambda (mq)
    (let/ec ec
      (parameterize ((max-count (how-many))
		     (count 0)
		     (escape-thunk ec))
	((make-sender
	  (lambda (envelope)
	    (inner-proc envelope)
	    ;; Check 'count' AFTER some things
	    ;; have been sent! Otherwise, the
	    ;; message is lost.
	    (count!)
	    (values)))
	 mq)))))

;; After all threads have exited, we'll ‘drain’ out
;; the left-overs.
(define drain? (make-parameter #f))

(define (make-sender/choice y? x y)
  "When @code{(y?)}, send with @code{y}. Else, send
with @code{x}."
  (lambda (mq)
    (if (y?)
	(y mq)
	(x mq))))

(define (inner-send envelope)
  (attempt-irrevocable-sent!
   envelope
   ((go message priority)
    (received/thread (cons (decode-thread-message message)
			   (received/thread)))
    (values))
   ((cancelled) (error "what/cancelled"))
   ((already-sent) (error "what/already-sent"))))

(define sender/thread
  (make-sender/choice
   drain?
   (make-every-n/thread
    (make-restricted-sender
     (lambda ()
       (+ k/thread e/min
	  (random (- e/max e/min -1) (random/thread))))
     make-one-by-one-sender
     inner-send)
    k/thread)
   (make-one-by-one-sender inner-send)))

(define (results->array per-thread-sent)
  ;; A bit array of messages the send function has
  ;; seen.
  (define a (make-typed-array 'b #f N_MESSAGES N_THREAD))
  (define (visit-message message)
    (define thread-index (car message))
    (define message-index (cdr message))
    (array-set! a #t message-index thread-index))
  (define (visit-per-thread _ messages)
    (for-each visit-message messages))
  (vector-for-each visit-per-thread per-thread-sent)
  a)

(define (array-missing a)
  (define missing '())
  (array-index-map! a
		    (lambda (i j)
		      (define found (array-ref a i j))
		      (unless found
			(set! missing `((,i . ,j) . ,missing)))
		      found))
  missing)

;; But possibly out-of-order!
(test-equal "nothing lost when sending concurrently"
  '()
  (let* ((mq (make-message-queue no-handlers
				 no-error-handler
				 sender/thread))
	 (thread-indices (iota N_THREAD))
	 ;; The ‘drained-out’ messages are put
	 ;; at index N_THREAD.
	 (results (make-vector (+ 1 N_THREAD)))
         (done? (vector-unfold (lambda (_) (make-condition)) N_THREAD))
	 (ready? (make-condition)))
    (run-fibers
     (lambda ()
       (define (run! thread-index)
	 (spawn-fiber
	  (lambda ()
	    (wait ready?)
	    (vector-set! results thread-index
			 (thread mq thread-index))
            (signal-condition! (vector-ref done? thread-index)))))
       (for-each run! thread-indices)
       ;; Try to start every thread at the same time!
       (signal-condition! ready?)
       ;; #:drain? #t with parallelism is broken,
       ;; see <https://github.com/wingo/fibers/issues/47>.
       ;; So explicitely wait on each fiber.
       (vector-for-each (lambda (_ c) (wait c)) done?))
     #:drain? #t
     ;; No need
     #:install-suspendable-ports? #f
     ;; More interrupts --> more switches
     ;; --> more test coverage. At least,
     ;; that's the idea. Not really tested.
     #:hz 700)
    ;; Drain the left-overs.
    (parameterize ((drain? #t)
		   (received/thread '()))
      (try-send-again! mq)
      (vector-set! results N_THREAD (received/thread)))
    (array-missing (results->array results))))



;; Test message injection / handling (no exceptions).

(define mhp (vector-unfold (lambda (_) (make-parameter #f)) 4))
(define mhv (vector-unfold (lambda (_) (make-parameter #f)) 4))
(define mh (apply message-handlers
                  (map (lambda (i)
                         (make-message-handler i (lambda (p) (p))
                                               (lambda _
                                                 (apply ((vector-ref mhv i)) _))
                                               (lambda _
                                                 (apply ((vector-ref mhp i)) _))))
                       (iota (vector-length mhp)))))

;; FWIW, passing #f is not really allowed.
(define mq (make-message-queue mh #f #f))

(test-eq "when injecting, handled message is eq?"
  #t
  (let ((m (make-slice/read-write 40))) ; could as wel have been 20
    (set%! /:message-header '(size)
	   (slice-slice m 0 (sizeof /:message-header '())) 40)
    (let/ec ec
      (parameterize (((vector-ref mhp 0)
                      (lambda (x)
                        (ec (eq? x m))))
                     ((vector-ref mhv 0)
                      (lambda (x)
                        (assert (eq? x m))
                        #t)))
        (inject-message! mq m)
        'unreachable))))

(test-eq "non-zero types ok"
  #t
  (let ((s (make-slice/read-write (sizeof /:message-header '()))))
    (set%! /:message-header '(type) s 3)
    (set%! /:message-header '(size) s (sizeof /:message-header '()))
    (let/ec ec
      (parameterize (((vector-ref mhp 3)
                      (lambda (x)
                        (ec (eq? x s))))
                     ((vector-ref mhv 3)
                      (lambda (x)
                        (assert (eq? s x))
                        #t)))
        (inject-message! mq s)
        'unreachable))))

(test-equal "verifier & handler only called once"
  '(1 . 1)
  (let ((hcount 0)
        (vcount 0)
        (s (make-slice/read-write (sizeof /:message-header '()))))
    (set%! /:message-header '(size) s (sizeof /:message-header '()))
    (parameterize (((vector-ref mhp 0)
                    (lambda (x)
                      (set! hcount (+ 1 hcount))
                      (assert (eq? x s))
                      (values)))
                   ((vector-ref mhv 0)
                    (lambda (x)
                      (set! vcount (+ 1 vcount))
                      (assert (eq? x s))
                      #t)))
      (inject-message! mq s)
      (cons hcount vcount))))



;; Test message injection (exceptions)
(test-equal "missing header error"
  (map (lambda (i)
	 `(missing-header-error (size . ,i)
				(who  . inject-message!)))
       (iota (sizeof /:message-header '())))
  (map (lambda (i)
	 (guard (e ((missing-header-error? e)
		    `(missing-header-error
		      (size . ,(missing-header-error-received-size e))
		      (who . ,(condition-who e)))))
	   (inject-message! mq (make-slice/read-write i))
	   'unreachable))
       (iota (sizeof /:message-header '()))))

(test-assert "[prop] wrong header size error"
  (quickcheck
   (property ((%real-length $natural)
	      (supposed-length $natural))
     (let* ((real-length (+ (sizeof /:message-header '())
			    %real-length))
	    (supposed-length (if (= real-length supposed-length)
				 (+ 1 supposed-length)
				 supposed-length))
	    (s (make-slice/read-write real-length))
	    (sheader (slice-slice s 0 (sizeof /:message-header '()))))
       (set%! /:message-header '(size)
	      (slice-slice s 0 (sizeof /:message-header '()))
	      supposed-length)
       (guard (e ((size-mismatch-error? e)
		  (equal? `(,(size-mismatch-error-expected-size e)
			    ,(size-mismatch-error-received-size e)
			    ,(condition-who e))
			  `(,supposed-length
			    ,real-length
			    inject-message!))))
	 (inject-message! mq s)
	 #f)))))

;; TODO: what if the message is (otherwise) malformed?



;; Test the following part of the send-message! docstring:
;; ‘After normal execution, the message envelope is returned,
;; but in case of an exception (for example, an out-of-memory exception
;; during the handling of a @code{&overly-full-queue-warning}), it is
;; possible the envelope isn't returned even though it has been enqueued
;; and it might perhaps be sent.
(test-assert "returned envelope and sent envelope are equal"
  (let* ((returned-values #f)
	 (sent-values #f)
	 (sender
	  (make-one-by-one-sender
	   (lambda envelope-arguments
	     (assert (eq? sent-values #f))
	     (set! sent-values envelope-arguments)
	     (values))))
	 (mq (make-message-queue #f #f sender))
	 (msg (index->dummy #xdeadbeef)))
    (call-with-values
	(lambda () (send-message! mq msg))
      (lambda return-values
	(set! returned-values return-values)))
    (and (equal? sent-values returned-values)
	 (= (length sent-values) 1)
	 (every envelope? sent-values))))

;; Strictly speaking, this test is allowed to fail
;; (as it is only ‘might’, not ‘it must be possible’),
;; but it seems a good idea to check our understanding is correct.
(test-assert "message might be enqueued & sent but not returned"
  (let* ((enqueued? #f)
	 (flush? (make-parameter #f))
	 (sender/flush
	  (make-one-by-one-sender
	   (lambda (envelope)
	     (set! enqueued? envelope)
	     (values))))
	 (sender/hold
	  (lambda _ (values)))
	 (sender (make-sender/choice flush? sender/hold
				     sender/flush))
	 (mq (make-message-queue #f #f sender))
	 (msg (index->dummy 0))
	 (exceptional #f)
	 (enveloped #f))
    (with-exception-handler
	(lambda (_)
	  (assert exceptional)
	  (assert (envelope? enqueued?))
	  (assert (not enveloped)))
      (lambda ()
	(with-exception-handler
	    (lambda (e)
	      (if (overly-full-queue-warning? e)
		  (begin
		    (set! exceptional #t)
		    (parameterize ((flush? #t))
		      (try-send-again! mq)
		      ;; At least in the current implementation,
		      ;; this holds.
		      ;;
		      ;; In a different implementation, the
		      ;; envelope could be enqueued after
		      ;; checking the queue length.
		      (assert enqueued?))
		    (throw 'out-of-memory))
		  (raise-exception e #:continuable? #t)))
	  (lambda ()
	    (call-with-values
		(lambda ()
		  (parameterize ((%suspicious-length 0))
		    (send-message! mq msg)))
	      (lambda args (set! enveloped args))))
	  #:unwind? #f))
      #:unwind? #t
      #:unwind-for-type 'out-of-memory)
    (and enqueued? exceptional
	 (not enveloped))))



;; Message cancellation.
;;
;; Cancellation is already tested in tests/envelope.scm.
;; However, the interaction with message queues has not
;; yet been tested.

;; This test detected (not detected by previous tests):
;;   * the cdr of the contents of messages+garbage/box
;;     being initialised incorrectly in make-message-queue
;;   * using car instead of cdr in increment-garbage&maybe-cleanup

(test-assert "envelopes do not keep a strong reference to the message queue"
  (let* ((mq (make-message-queue #f #f (lambda _ (values))))
	 (mq-guard (make-guardian))
	 (envelope (send-message! mq (index->dummy 0))))
    (mq-guard mq)
    (attempt-cancel!
     envelope
     ((now-cancelled)
      (gc)
      (->bool (mq-guard)))
     ((already-cancelled) (error "what/cancelled"))
     ((already-sent) (error "what/sent")))))

(define (count-guardian/cancelled guardian)
  "Count how many elements are present in @var{guardian}.
While we're at it, verify each element is a cancelled envelope."
  (let loop ((n 0))
    (let ((e (guardian)))
      (cond ((not e) n)
	    ((envelope-peek-cancelled? e) (loop (+ n 1)))
	    (#t (error "a not-cancelled envelope was freed!"))))))

(define (count-guardian/uncancelled guardian)
  "Count how many elements are present in @var{guardian}.
While we're at it, verify each element is an uncancelled envelope."
  (let loop ((n 0))
    (let ((e (guardian)))
      (cond ((not e) n)
	    ((not (envelope-peek-cancelled? e)) (loop (+ n 1)))
	    (#t (error "a cancelled envelope was freed!"))))))

;; This is a variant of
;; "the one-by-one message sender removes cancelled envelopes",
;; using guardians, and purely testing the cancelling code, and
;; not the sending code.
;;
;; It detects the following mutations:
;;   * removing (spin queue+garbage) after swap! in the 'envelope-peek-cancelled?'
;;     branch of 'make-one-by-one-sender'
(test-assert "cancelling envelopes eventually frees memory even if message sender is dead"
  (let* ((mq (make-message-queue #f #f (lambda _ (values))))
	 (cancelled-guard (make-guardian))
	 (uncancelled-guard (make-guardian)))
    ;; Add a bunch of messages.
    (let ((messages
	   (map (lambda (i)
		  (send-message! mq (index->dummy i)))
		(iota 50))))
      ;; Cancel most of them.  This should trigger collection of
      ;; cancelled envelopes.
      (for-each
       (lambda (e)
	 (cancelled-guard e)
	 (attempt-cancel!
	  e
	  ((now-cancelled) (values))
	  ((already-cancelled) (error "what/cancelled"))
	  ((already-sent) (error "what/sent"))))
       (list-head messages 40)))
    ;; Move freed envelopes to the guardian.
    (gc)
    ;; How many were freed?
    (let ((freed/cancelled (count-guardian/cancelled cancelled-guard))
	  (freed/uncancelled (count-guardian/uncancelled uncancelled-guard))
	  (cancelled 40)
	  (total 50))
      (pk 'total total 'cancelled cancelled 'freed/cancelled freed/cancelled
	  'freed/uncancelled freed/uncancelled
	  'queue-length (message-queue-length mq))
      ;; Only cancelled messages were supposed to be freed.
      (assert (= freed/uncancelled 0))
      (assert (<= freed/cancelled cancelled))
      ;; A large fraction of cancelled messages should be freed.
      (assert (>= (/ freed/cancelled cancelled) 7/8))
      ;; If the GC is exact, all messages removed from the message
      ;; queue (due to cancelling) should be removed.
      (unless (conservative-gc?)
	(assert (= freed/cancelled (- total (message-queue-length mq)))))
      #t)))

(define sender/no-cancelled
  (make-one-by-one-sender
   (lambda (e)
     (pk 'ee e)
     (assert (not (envelope-peek-cancelled? e)))
     (values))))

;; Not strictly necessary (and also undocumented), but this should
;; improve the accuracy of the garbage counter. Maybe not trying
;; to send useless (cancelled) envelopes could help with performance
;; as well (untested)?
;;
;; Also, this  caught a bug in (gnu gnunet mq) -- the procedure returned
;; by 'make-one-by-one-sender' went into an infinite loop if it encountered
;; a cancelled envelope.
;;
;; This tests detects negating the test
;;   (eq? old (swap! old (cons old-queue incremented-garbage)))
;; in increment-garbage&maybe-cleanup.

(test-assert "the one-by-one message sender removes cancelled envelopes"
  (let* ((flush? (make-parameter #f))
	 (sender (make-sender/choice flush? (lambda _ (values))
				     sender/no-cancelled))
	 (mq (make-message-queue #f #f sender)))
    ;; Fill the queue with many uncancelled messages, such that
    ;; the logic for collecting cancelled envelopes doesn't kick in too early.
    (do ((i 0 (+ i 1)))
	((>= i 30))
      (send-message! mq (index->dummy i)))
    (assert (= (message-queue-length mq) 30))
    ;; Now add some envelopes to the queue & cancel them.
    (do ((i 0 (+ i 1)))
	((>= i 4))
      (attempt-cancel!
       (send-message! mq (index->dummy (+ 30 i)))
       ((now-cancelled) (values))
       ((already-cancelled) (error "what / cancelled"))
       ((already-sent) (error "what / sent"))))
    (assert (= (message-queue-length mq) 34))
    (parameterize ((flush? #t))
      (try-send-again! mq))
    (assert (= (message-queue-length mq) 0))
    (assert (= (%message-queue-garbagitude mq) 0))
    #t))

;; This is a variation of "nothing lost when sending concurrently",
;; but for cancelation.
;;
;; This test fails in case of the following mutations:
;;   * replace 0 with 1 in (or some other number) in
;;     (swap! old (cons filtered 0))
;;     in increment-garbage&maybe-cleanup
(test-assert "the (approximate) cancellation count is accurate, when not sending, even when cancelling concurrently (also, uncancelled messages are not lost)"
  (let* ((messages/cancellation 10000)
	 (n/not-cancelled #f)
	 (flush? (make-parameter #f))
	 (sender/check (lambda (e)
			 (unless (envelope-peek-cancelled? e)
			   (set! n/not-cancelled (+ 1 n/not-cancelled)))
			 (values)))
	 (sender (make-sender/choice flush?
				     (lambda _ (values))
				     (make-one-by-one-sender sender/check)))
	 (mq (make-message-queue #f #f sender))
	 (ready? (make-condition))
	 (done? (vector-unfold
		 (lambda (_) (make-condition))
		 (/ messages/cancellation 2)))
	 (messages
	  (with-exception-handler
	      (lambda (e)
		(if (overly-full-queue-warning? e)
		    (values)
		    (raise-exception e #:continuable? #t)))
	    (lambda ()
	      (vector-unfold (compose (cut send-message! mq <>)
				      index->dummy)
			     messages/cancellation)))))
    (run-fibers
     (lambda ()
       ;; Cancel half of the messages, concurrently.
       ;; Only half of all the messages are cancelled,
       ;; to avoid resetting the garbage counter.
       (vector-for-each
	(lambda (i done? message)
	  (when (< i (/ messages/cancellation 2))
	    (spawn-fiber
	     (lambda ()
	       (wait ready?)
	       (attempt-cancel!
		message
		((now-cancelled)
		 (signal-condition! done?)
		 (values))
		((already-cancelled)
		 (signal-condition! done?)
		 (error "what/cancelled"))
		((already-sent)
		 (signal-condition! done?)
		 (error "what/sent")))))))
	done? messages)
       (signal-condition! ready?)
       (vector-for-each (lambda (_ c) (wait c)) done?))
     #:hz 4000)
    ;; Verify the estimate is accurate, at least in this
    ;; situation.
    (assert (= (pk 'garbagitude (%message-queue-garbagitude mq))
	       (pk 'expected (/ messages/cancellation 2))))
    ;; Cancel more messages (until 7/8 are cancelled),
    ;; to trigger collection. While we're at, verify
    ;; the estimate is still correct.
    (do ((i (/ messages/cancellation 2) (+ i 1)))
	((>= (/ i messages/cancellation) 7/8))
      (attempt-cancel!
       (vector-ref messages (pk 'iiii i))
       ((now-cancelled)
	;; 3/4 is the (arbitrary) ratio at which
	;; the garbage is thrown out
	(if (< (* 4 i) (* 3 messages/cancellation))
	    (assert (= (%message-queue-garbagitude mq)
		       (+ i 1)))
	    (assert (= (%message-queue-garbagitude mq)
		       (- i (* 3/4 messages/cancellation))))))
       ((already-cancelled) (error "what/cancelled2"))
       ((already-sent) (error "what/sent2"))))
    ;; Now send the envelopes, to verify uncancelled messages
    ;; are still in the queue.
    (parameterize ((flush? #t))
      (set! n/not-cancelled 0)
      (try-send-again! mq))
    (assert (= n/not-cancelled (* 1/8 messages/cancellation)))
    ;; As everything has been removed from the queue,
    ;; the estimate should now be zero.
    (assert (= (pk 'final-garbagitude (%message-queue-garbagitude mq))
	       0))
    #t))

